go channel channel 是 go 语言中并发控制的一大手段,可以通过 channel 来进行协程之间的通信。
channel 的使用 在研究源码之前,复习一下 channel 的基本使用方法。
channel 分为无缓存和有缓存的 channel,其中,
无缓存的 channel 在读取和写入时都会造成阻塞,只有当双方都准备好时,才会持续写入和读取。换句话说,无缓存的 channel 不能同步读写,只能异步读写。
有缓存的 channel 在写入时不会阻塞,也就是说,有缓存的 channel 可以实现同步读写。
可以看以下两个示例:
无缓存的 channel 死锁
1 2 3 var ch = make (chan int )ch <- 2 <-ch
执行上述伪代码会报错:fatal error: all goroutines are asleep - deadlock!
这是因为在第二行,没有缓存的 channel 就会阻塞,等待一个 goroutine 将其数据读出来。负责读取的第三句也在等待读取数据,但是因为前面一直阻塞,所以无法读取到数据,导致死锁。
正确的姿势应该是:
1 2 3 4 5 var ch = make (chan int )go func () { ch <- 2 }() fmt.Println(<-ch)
有缓存的 channel 对于有缓存的例子:
1 2 3 var ch = make (chan int , 1 )ch <- 2 <-ch
协程之间的通信 配合 select 关键字使用,使协程之间通过 channel 通信。下面是一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func main () { var wg = sync.WaitGroup{} var ch1 = make (chan int ) var ch2 = make (chan int ) go func () { wg.Add(1 ) fmt.Println("开始协程 1" ) defer func () { fmt.Println("退出协程 1" ) wg.Done() }() for i := 0 ; ; i++ { select { case val := <-ch1: fmt.Println("ch1 接收到数据 " , val) return case ch2 <- i: time.Sleep(time.Second) fmt.Println("发送数据到 ch2, " , i) } } }() go func () { wg.Add(1 ) fmt.Println("开始协程 2" ) defer func () { fmt.Println("退出协程 2" ) wg.Done() }() for { select { case val := <-ch2: fmt.Println("ch2 接收到数据 " , val) if val == 3 { ch1 <- val return } } } }() time.Sleep(time.Second) wg.Wait() }
输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 开始协程 1 开始协程 2 ch2 接收到数据 0 发送数据到 ch2, 0 ch2 接收到数据 1 发送数据到 ch2, 1 ch2 接收到数据 2 发送数据到 ch2, 2 ch2 接收到数据 3 发送数据到 ch2, 3 退出协程 2 ch1 接收到数据 3 退出协程 1
channel 的实现 在 runtime/chan.go
文件中,可以看到 channel 具体实现。
数据结构 hchan 1 2 3 4 5 6 7 8 9 10 11 12 13 14 type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex }
waitq 1 2 3 4 type waitq struct { first *sudog last *sudog }
sudog 这个结构体在前面讲 sema 的文章中有所提及。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog waitlink *sudog waittail *sudog c *hchan }
通过上述结构体可以看出,channel 主要是由 一个循环队列 + 两个链表 实现的。
初始化 常见的 channel 初始化方法是:
1 var ch = make (chan type )
当我们调用 make(chan type)
时,对应的源码是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 func makechan (t *chantype, size int ) *hchan { elem := t.elem if elem.size >= 1 <<16 { throw("makechan: invalid channel element type" ) } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment" ) } mem, overflow := math.MulUintptr(elem.size, uintptr (size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic (plainError("makechan: size out of range" )) } var c *hchan switch { case mem == 0 : c = (*hchan)(mallocgc(hchanSize, nil , true )) c.buf = c.raceaddr() case elem.ptrdata == 0 : c = (*hchan)(mallocgc(hchanSize+mem, nil , true )) c.buf = add(unsafe.Pointer(c), hchanSize) default : c = new (hchan) c.buf = mallocgc(mem, elem, true ) } c.elemsize = uint16 (elem.size) c.elemtype = elem c.dataqsiz = uint (size) lockInit(&c.lock, lockRankHchan) if debugChan { print ("makechan: chan=" , c, "; elemsize=" , elem.size, "; dataqsiz=" , size, "\n" ) } return c }
发送数据 对于发送,可以看 chansend 代码。
在此之前,讲一下两个函数:
gopark,将当前 goroutine 变为待唤醒状态
goready,将当前 goroutine 唤醒
下面代码将一些无关紧要的去掉了(比如竟态检测,debug 打印
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 func chansend (c *hchan, ep unsafe.Pointer, block bool , callerpc uintptr ) bool { if c == nil { if !block { return false } gopark(nil , nil , waitReasonChanSendNilChan, traceEvGoStop, 2 ) throw("unreachable" ) } if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("send on closed channel" )) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true ) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2 ) KeepAlive(ep) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup" ) } panic (plainError("send on closed channel" )) } return true }
从上述代码中可以看出,发送数据分为有缓冲区和无缓冲区的情况。
有缓冲区时,如果缓冲区还有空间,则将发送的数据和各种信息放进缓冲区中。否则,与无缓冲区一样处理
无缓冲区或者缓冲区已满,那么就会将 goroutine 放入等待队列中,阻塞等待。
接收数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 func chanrecv (c *hchan, ep unsafe.Pointer, block bool ) (selected, received bool ) { if c == nil { if !block { return } gopark(nil , nil , waitReasonChanReceiveNilChan, traceEvGoStop, 2 ) throw("unreachable" ) } if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { if c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } } else { if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true , true } } if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true , true } if !block { unlock(&c.lock) return false , false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gp.parkingOnChan.Store(true ) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2 ) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true , success }
从添加的注释可以看到,处理接收数据的函数大概情况是:
channel 为空,挂起并返回
channel 关闭,且无数据在缓冲区中,返回
有缓冲区,缓冲区中有数据,直接拿数据
无缓冲区,或者缓冲区中无数据,但有等待发送数据的 goroutine,拿到数据并返回
无缓冲区,缓冲区中无数据,无等待发送数据的 goroutine,挂起接收的这个 goroutine,阻塞等待数据接收。
关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 func closechan (c *hchan) { if c == nil { panic (plainError("close of nil channel" )) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("close of closed channel" )) } c.closed = 1 var glist gList for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3 ) } }
Reference 图解Golang channel源码 - 掘金 (juejin.cn)
golang channel 源码剖析 - 知乎 (zhihu.com)